package com.huan.table

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object FileOutPutTest {
  def main(args: Array[String]): Unit = {
    //1.创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //表环境
    val tableEnv = StreamTableEnvironment.create(env)

    //获取路径
    val filePath = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt"

    tableEnv.connect(new FileSystem().path(filePath)) //定义来源
      .withFormat(new Csv()) //设置格式CSV
      .withSchema(new Schema() //设置表结构
        .field("id", DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("inputTable") //创建临时表

    //转换操作
    val sensorTable = tableEnv.from("inputTable")
    val resultTable = sensorTable.select('id,'temperature)
      .filter('id === "sensor_1")


    //聚合转换
    val aggTable = sensorTable
      .groupBy('id) //基于id 分组
      .select('id,'id.count as 'count)


    //输出到文件
    //注册输出表
    val outPath = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\output.txt"

    tableEnv.connect(new FileSystem().path(outPath))
        .withFormat(new Csv())
        .withSchema(new Schema()
          .field("id",DataTypes.STRING())
          .field("temperature",DataTypes.DOUBLE())
        ).createTemporaryTable("outputTable")

    //输出表(文件)
    resultTable.insertInto("outputTable")

//    resultTable.toAppendStream[(String,Double)].print("result")
//    //出现的结果有True和False 如果发现有false,说明数据作废
//    aggTable.toRetractStream[(String,Long)].print("agg")

    env.execute("FileOutPutTest")
  }
}
